// Copyright 2025 Huawei Cloud Computing Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package shelf

import (
	"encoding/binary"
	"fmt"
	"io"
	"math"
	"path/filepath"
	"slices"
	"strings"
	"sync"
	"sync/atomic"
	"time"
	"unsafe"

	"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
	"github.com/golang/snappy"
	"github.com/openGemini/openGemini/engine/immutable"
	"github.com/openGemini/openGemini/lib/bufferpool"
	"github.com/openGemini/openGemini/lib/config"
	"github.com/openGemini/openGemini/lib/fileops"
	"github.com/openGemini/openGemini/lib/logger"
	"github.com/openGemini/openGemini/lib/obs"
	"github.com/openGemini/openGemini/lib/record"
	"github.com/openGemini/openGemini/lib/util"
	"github.com/openGemini/openGemini/lib/util/lifted/encoding/lz4"
	"go.uber.org/zap"
)

const (
	walFileSuffixes = "wal"
	maxWalBlockSize = 256 * config.MB

	flagRecord    = 0
	flagSeriesKey = 1

	walCompressNone   = 0
	walCompressLz4    = 1
	walCompressSnappy = 2

	walBufferSize = config.MB
)

var walBlockHeaderSize = int(unsafe.Sizeof(WalBlockHeader{}))
var zeroBlockHeader = make([]byte, walBlockHeaderSize)

type WriterFlusher interface {
	io.Writer
	Flush() error
}

type Wal struct {
	util.Reference

	mu        sync.RWMutex
	closeOnce sync.Once

	// for store record and seriesKey
	file *WalFile

	// record the offset of each series data in the wal file
	seriesOffsets *SeriesOffsets

	// Index not created or loaded into cache.
	// Record the series key, which is used by the background thread to asynchronously create indexes.
	seriesKeyOffsets *SeriesKeyOffsets

	targetMu sync.RWMutex
	// sequence number of the TSSP file generated by this WAL file
	targetTSSP map[uint64]struct{}

	seriesMap *SeriesMap

	timeRange util.TimeRange
	ctx       *WalCtx

	openAt   uint64
	expireAt uint64
	lock     *string
	dir      string

	opened          bool
	backgroundSync  bool
	backgroundFlush bool
	writing         bool

	option *obs.ObsOptions
}

func NewWal(dir string, lock *string, opt *obs.ObsOptions) *Wal {
	wal := &Wal{
		lock:             lock,
		seriesOffsets:    NewSeriesOffsets(),
		seriesKeyOffsets: NewSeriesKeyOffsets(),
		dir:              dir,
		seriesMap:        NewSeriesMap(),
		timeRange: util.TimeRange{
			Min: math.MaxInt64,
			Max: math.MinInt64,
		},
		ctx:             &WalCtx{},
		targetTSSP:      make(map[uint64]struct{}),
		backgroundSync:  conf.ReliabilityLevel < config.ReliabilityLevelHigh,
		backgroundFlush: conf.ReliabilityLevel == config.ReliabilityLevelLow,
		expireAt:        fasttime.UnixTimestamp() + uint64(time.Duration(conf.MaxWalDuration)/time.Second),
		option:          opt,
	}

	return wal
}

func (wal *Wal) SizeLimited() bool {
	return wal.opened && wal.file.WrittenSize() >= int64(conf.MaxWalFileSize)
}

func (wal *Wal) WrittenSize() int64 {
	if !wal.opened {
		return 0
	}
	return wal.file.WrittenSize()
}

func (wal *Wal) NeedCreateIndex() bool {
	return wal.seriesKeyOffsets.Len() > 0
}

func (wal *Wal) Overlaps(min, max int64) bool {
	return wal.timeRange.Overlaps(min, max)
}

func (wal *Wal) Name() string {
	return wal.file.Name()
}

func (wal *Wal) EndWrite() {
	if wal != nil {
		wal.writing = false
		util.MustRun(wal.Flush)
		util.MustRun(wal.Sync)
	}
}

func (wal *Wal) open() error {
	if wal.opened {
		return nil
	}

	err := fileops.MkdirAll(wal.dir, 0700)
	if err != nil {
		return err
	}

	file := filepath.Join(wal.dir, fmt.Sprintf("%d.%s", AllocWalSeq(), walFileSuffixes))
	wal.file = NewWalFile(file, wal.lock)
	wal.file.setWalObsOptions(wal.option)

	if err = wal.file.Open(); err != nil {
		return err
	}

	if config.ShelfHotModeEnabled() {
		wal.file.OpenMemFile()
		wal.openAt = fasttime.UnixTimestamp()
		immutable.NewHotFileManager().Add(wal)
	}

	wal.opened = true
	return nil
}

func (wal *Wal) Expired() bool {
	return wal.opened && !wal.writing && wal.WrittenSize() > 0 &&
		fasttime.UnixTimestamp() >= wal.expireAt
}

func (wal *Wal) BackgroundSync() {
	if !wal.opened {
		return
	}

	err := wal.sync()
	if err != nil {
		logger.GetLogger().Error("failed to sync wal",
			zap.Error(err),
			zap.String("file", wal.file.Name()))
	}
}

func (wal *Wal) UpdateTimeRange(tr *util.TimeRange) {
	if wal.timeRange.Min > tr.Min {
		wal.timeRange.Min = tr.Min
	}
	if wal.timeRange.Max < tr.Max {
		wal.timeRange.Max = tr.Max
	}
}

func (wal *Wal) WriteRecord(sid uint64, seriesKey []byte, rec []byte) error {
	err := wal.open()
	if err != nil {
		return err
	}

	mst := util.Bytes2str(record.ReadMstFromSeriesKey(seriesKey))

	if sid == 0 {
		err = wal.writeSeriesKey(seriesKey)
		if err != nil {
			return err
		}
	}

	ofs := wal.WrittenSize()
	err = wal.writeRecord(mst, sid, rec)
	if err != nil {
		return err
	}

	if sid > 0 {
		wal.MapSeries(mst, sid)
		wal.AddSeriesOffsets(sid, ofs)
		return nil
	}
	wal.addSeriesKeyOffset(seriesKey, ofs)
	return nil
}

func (wal *Wal) MapSeries(mst string, sid uint64) {
	wal.seriesMap.Add(mst, sid)
}

func (wal *Wal) HasMeasurement(mst string) bool {
	return wal.seriesMap.HasMeasurement(mst)
}

func (wal *Wal) writeRecord(mst string, sid uint64, rec []byte) error {
	block := wal.compress(mst, sid, rec)

	wal.mu.RLock()
	defer wal.mu.RUnlock()

	_, err := wal.file.Write(block)
	return err
}

func (wal *Wal) writeSeriesKey(seriesKey []byte) error {
	buf := wal.compressSeriesKey(seriesKey)

	wal.mu.RLock()
	defer wal.mu.RUnlock()

	_, err := wal.file.Write(buf)
	return err
}

func (wal *Wal) compressSeriesKey(seriesKey []byte) []byte {
	buf := &wal.ctx.buf
	header := &wal.ctx.header

	switch conf.WalCompressMode {
	case walCompressLz4:
		var err error
		buf.Swap = slices.Grow(buf.Swap, walBlockHeaderSize)[:walBlockHeaderSize]
		buf.Swap, err = LZ4CompressBlock(seriesKey, buf.Swap[:len(buf.Swap)])
		if err != nil {
			// fault-tolerant processing without compressing data
			logger.GetLogger().Error("failed to lz4 compress block", zap.Error(err))
			break
		}

		header.Set(len(buf.Swap)-walBlockHeaderSize, 0, flagSeriesKey, walCompressLz4, 0)
		header.Put(buf.Swap)
	case walCompressSnappy:
		buf.Swap = slices.Grow(buf.Swap, walBlockHeaderSize)[:walBlockHeaderSize]
		buf.Swap = SnappyCompressBlock(seriesKey, buf.Swap[:len(buf.Swap)])
		header.Set(len(buf.Swap)-walBlockHeaderSize, 0, flagSeriesKey, walCompressSnappy, 0)
		header.Put(buf.Swap)
	default:
		break
	}

	buf.Swap = append(buf.Swap[:0], zeroBlockHeader[:]...)
	buf.Swap = append(buf.Swap, seriesKey...)
	header.Set(len(seriesKey), 0, flagSeriesKey, walCompressNone, 0)
	header.Put(buf.Swap)

	return buf.Swap
}

// | 16-byte header | N-byte measurement name | N-byte Record |
func (wal *Wal) compress(mst string, sid uint64, rec []byte) []byte {
	buf := &wal.ctx.buf
	header := &wal.ctx.header
	var err error

	switch conf.WalCompressMode {
	case walCompressLz4:
		buf.Swap = slices.Grow(buf.Swap, walBlockHeaderSize)[:walBlockHeaderSize]
		buf.Swap = append(buf.Swap, mst...)
		buf.Swap, err = LZ4CompressBlock(rec, buf.Swap[:len(buf.Swap)])
		if err != nil {
			// fault-tolerant processing without compressing data
			logger.GetLogger().Error("failed to lz4 compress block", zap.Error(err))
			break
		}

		header.Set(len(buf.Swap)-walBlockHeaderSize-len(mst), len(mst), flagRecord, walCompressLz4, sid)
		header.Put(buf.Swap)
		return buf.Swap
	case walCompressSnappy:
		buf.Swap = slices.Grow(buf.Swap, walBlockHeaderSize)[:walBlockHeaderSize]
		buf.Swap = append(buf.Swap, mst...)
		buf.Swap = SnappyCompressBlock(rec, buf.Swap[:len(buf.Swap)])
		header.Set(len(buf.Swap)-walBlockHeaderSize-len(mst), len(mst), flagRecord, walCompressSnappy, sid)
		header.Put(buf.Swap)
		return buf.Swap
	default:
		break
	}

	buf.Swap = append(buf.Swap[:0], zeroBlockHeader[:]...)
	buf.Swap = append(buf.Swap, mst...)
	buf.Swap = append(buf.Swap, rec...)
	header.Set(len(buf.Swap)-walBlockHeaderSize-len(mst), len(mst), flagRecord, walCompressNone, sid)
	header.Put(buf.Swap)
	return buf.Swap
}

func (wal *Wal) Opened() bool {
	return wal.opened
}

func (wal *Wal) Flush() error {
	if wal.backgroundFlush {
		return nil
	}

	wal.mu.Lock()
	defer wal.mu.Unlock()

	return wal.file.Flush()
}

func (wal *Wal) Sync() error {
	if wal.backgroundSync {
		return nil
	}
	return wal.sync()
}

func (wal *Wal) sync() error {
	wal.mu.Lock()
	defer wal.mu.Unlock()

	err := wal.file.Flush()
	if err != nil {
		return err
	}

	return wal.file.Sync()
}

func (wal *Wal) PopSeriesKey() ([]byte, int64, int64) {
	return wal.seriesKeyOffsets.Pop()
}

func (wal *Wal) GetAllSid(dst []uint64) []uint64 {
	return wal.seriesOffsets.GetAllKeyNoLock(dst)
}

func (wal *Wal) MustClose() {
	if !wal.opened {
		return
	}
	util.MustRun(wal.sync)
	wal.closeOnce.Do(func() {
		util.MustClose(wal.file)
	})
}

func (wal *Wal) Clean() {
	lockOpt := fileops.FileLockOption(*wal.lock)
	util.MustRun(func() error {
		return fileops.RemoveAll(wal.file.Name(), lockOpt)
	})
}

func (wal *Wal) PreLoad(name string) {
	wal.file = NewWalFile(name, wal.lock)
	wal.file.setWalObsOptions(wal.option)
}

func (wal *Wal) LoadFromDisk() error {
	if wal.opened {
		wal.file.LoadIntoMemory()
		return nil
	}

	if err := wal.file.OpenReadonly(); err != nil {
		return err
	}

	wal.opened = true
	wal.file.LoadIntoMemory()

	wal.loadWal()

	return nil
}

// Reload the WAL file upon process startup
func (wal *Wal) loadWal() {
	header := &wal.ctx.header
	headBuf := make([]byte, walBlockHeaderSize)
	var mstBuf []byte
	var err error
	var offset int64 = 0
	fd := wal.file
	var seriesKey []byte

	for {
		_, err = fd.ReadFull(headBuf[:])
		if err == io.EOF {
			break
		}
		if err != nil {
			logger.GetLogger().Error("failed to read ext block", zap.Error(err))
			break
		}

		err = header.Unmarshal(headBuf)
		if err != nil {
			logger.GetLogger().Error("failed to unmarshal ext block", zap.Error(err))
			break
		}

		if header.mstLen > 0 {
			mstBuf = slices.Grow(mstBuf[:0], int(header.mstLen))[:header.mstLen]
			_, err = fd.ReadFull(mstBuf)
			if err != nil {
				logger.GetLogger().Error("failed to read measurement name", zap.Error(err))
				break
			}
		}

		if header.typeFlag == flagSeriesKey {
			// block series key
			seriesKey, err = readSeriesKey(seriesKey, header, fd)
			if err != nil {
				logger.GetLogger().Error("failed to read series key", zap.Error(err))
				break
			}

			// next block offset
			offset += int64(walBlockHeaderSize) + int64(header.size)
		} else {
			if header.sid > 0 {
				wal.seriesMap.Add(util.Bytes2str(mstBuf), header.sid)
				wal.AddSeriesOffsets(header.sid, offset)
			} else if len(seriesKey) > 0 {
				wal.addSeriesKeyOffset(seriesKey, offset)
				seriesKey = seriesKey[:0]
			}

			// next block offset
			offset += int64(walBlockHeaderSize) + int64(header.size) + int64(header.mstLen)
		}

		_, err = fd.Seek(offset, io.SeekStart)
		if err != nil {
			logger.GetLogger().Error("failed to seek offset", zap.Error(err))
			break
		}
	}
}

func (wal *Wal) AddSeriesOffsets(sid uint64, offsets int64) {
	wal.seriesOffsets.Add(sid, offsets)
}

func (wal *Wal) addSeriesKeyOffset(seriesKey []byte, offset int64) {
	wal.seriesKeyOffsets.Add(seriesKey, offset)
}

func (wal *Wal) AddTargetTSSPFiles(files ...immutable.TSSPFile) {
	for _, file := range files {
		_, seq := file.LevelAndSequence()
		wal.targetMu.Lock()
		wal.targetTSSP[seq] = struct{}{}
		wal.targetMu.Unlock()
	}
}

func (wal *Wal) TargetContain(files ...immutable.TSSPFile) bool {
	if len(wal.targetTSSP) == 0 {
		return false
	}

	for _, file := range files {
		_, seq := file.LevelAndSequence()
		wal.targetMu.RLock()
		_, ok := wal.targetTSSP[seq]
		wal.targetMu.RUnlock()
		if ok {
			return true
		}
	}

	return false
}

func (wal *Wal) FreeMemory() {
	wal.file.FreeMemory()
}

func (wal *Wal) InMemSize() int64 {
	return wal.file.InMemSize()
}

func (wal *Wal) MinMaxTime() (int64, int64, error) {
	ns := int64(wal.openAt) * time.Second.Nanoseconds()
	return ns, ns, nil
}

type WalBlockHeader struct {
	size         uint32 // length of block
	mstLen       uint16 // length of measurement name
	typeFlag     uint8  // block data type
	compressFlag uint8  // block compression algorithm
	sid          uint64
}

func (h *WalBlockHeader) Set(size int, mstLen int, typeFlag, compressFlag uint8, sid uint64) {
	h.size = uint32(size)
	h.mstLen = uint16(mstLen)
	h.typeFlag = typeFlag
	h.compressFlag = compressFlag
	h.sid = sid
}

func (h *WalBlockHeader) Put(dst []byte) {
	if cap(dst) < walBlockHeaderSize {
		return
	}

	dst = dst[:walBlockHeaderSize]

	v := uint64(h.size)<<32 | uint64(h.mstLen)<<16 | uint64(h.typeFlag)<<8 | uint64(h.compressFlag)
	binary.BigEndian.PutUint64(dst, v)
	binary.BigEndian.PutUint64(dst[util.Uint64SizeBytes:], h.sid)
}

func (h *WalBlockHeader) Unmarshal(src []byte) error {
	if len(src) < walBlockHeaderSize {
		return fmt.Errorf("too few bytes for WalBlockHeader")
	}

	v := binary.BigEndian.Uint64(src)
	h.size = uint32(v >> 32)
	h.mstLen = uint16(v >> 16)
	h.typeFlag = uint8(v >> 8)
	h.compressFlag = uint8(v)
	h.sid = binary.BigEndian.Uint64(src[util.Uint64SizeBytes:])
	return nil
}

type Offsets struct {
	ofs []int64
}

func (o *Offsets) Add(v int64) {
	o.ofs = append(o.ofs, v)

	// Asynchronous index creation may cause the offset to become unordered
	// The offset size represents the order in which data is written
	size := len(o.ofs)
	if size > 1 && o.ofs[size-1] < o.ofs[size-2] {
		slices.Sort(o.ofs)
	}
}

var seriesMemSize atomic.Int64

type SeriesKeyOffsets struct {
	mu      sync.RWMutex
	keys    [][]byte
	offsets []int64

	// measure the latency for asynchronous index creation to determine data visibility
	ts    []int64
	count int
	swap  []byte
}

func NewSeriesKeyOffsets() *SeriesKeyOffsets {
	return &SeriesKeyOffsets{}
}

func (o *SeriesKeyOffsets) Add(key []byte, ofs int64) {
	ts := time.Now().UnixNano()
	k := o.compressKey(key)
	seriesMemSize.Add(int64(len(k)))

	o.mu.Lock()
	defer o.mu.Unlock()

	o.keys = append(o.keys, k)
	o.offsets = append(o.offsets, ofs)
	o.ts = append(o.ts, ts)
	o.count++
}

func (o *SeriesKeyOffsets) Pop() ([]byte, int64, int64) {
	key, ofs, ts := o.pop()

	if len(key) == 0 {
		return nil, 0, 0
	}

	if key[0] == walCompressNone {
		return key[1:], ofs, ts
	}

	dk, err := LZ4DecompressBlock(key[1:], nil)
	if err != nil {
		logger.GetLogger().Error("[BUG] failed to decompress series key", zap.Error(err))
		dk = nil
	}

	return dk, ofs, ts
}

func (o *SeriesKeyOffsets) pop() ([]byte, int64, int64) {
	o.mu.Lock()
	defer o.mu.Unlock()

	size := len(o.keys)
	if size == 0 {
		return nil, 0, 0
	}

	// FIFO
	key, ofs, ts := o.keys[0], o.offsets[0], o.ts[0]
	o.keys = o.keys[1:size:cap(o.keys)]
	o.offsets = o.offsets[1:size:cap(o.offsets)]
	o.ts = o.ts[1:size:cap(o.ts)]
	o.count = size - 1
	seriesMemSize.Add(-int64(len(key)))

	return key, ofs, ts
}

func (o *SeriesKeyOffsets) compressKey(key []byte) []byte {
	if o.needCompress() {
		var err error
		o.swap, err = LZ4CompressBlock(key, o.swap[:0])
		if err == nil {
			k := make([]byte, len(o.swap)+1)
			k[0] = walCompressLz4
			copy(k[1:], o.swap)
			return k
		}
		logger.GetLogger().Error("LZ4 compress failed", zap.Error(err))
		// Compression failed, downgraded to no compression
	}

	k := make([]byte, len(key)+1)
	k[0] = walCompressNone
	copy(k[1:], key)
	return k
}

func (o *SeriesKeyOffsets) needCompress() bool {

	return conf.SeriesKeyCompressThreshold > 0 && seriesMemSize.Load() >= int64(conf.SeriesKeyCompressThreshold)
}

func (o *SeriesKeyOffsets) Len() int {
	return o.count
}

type SeriesOffsets struct {
	mu   sync.RWMutex
	data map[uint64]*Offsets
}

func NewSeriesOffsets() *SeriesOffsets {
	return &SeriesOffsets{data: make(map[uint64]*Offsets)}
}

func (o *SeriesOffsets) Add(key uint64, ofs int64) {
	o.mu.Lock()
	defer o.mu.Unlock()

	offsets, ok := o.data[key]
	if !ok {
		offsets = &Offsets{}
		o.data[key] = offsets
	}
	offsets.Add(ofs)
}

func (o *SeriesOffsets) Get(sid uint64, dst []int64) []int64 {
	o.mu.RLock()
	defer o.mu.RUnlock()

	offsets, ok := o.data[sid]
	if !ok {
		return dst
	}

	dst = append(dst, offsets.ofs...)
	return dst
}

func (o *SeriesOffsets) GetAllKeyNoLock(dst []uint64) []uint64 {
	for sid := range o.data {
		dst = append(dst, sid)
	}
	return dst
}

func LZ4CompressBlock(src, dst []byte) ([]byte, error) {
	bound := lz4.CompressBlockBound(len(src)) +
		util.Uint32SizeBytes // extra 4 bytes record the length of the original data

	size := len(dst)
	dst = slices.Grow(dst, bound)

	n, err := lz4.CompressBlock(src, dst[size+util.Uint32SizeBytes:size+bound])
	if err == nil {
		binary.BigEndian.PutUint32(dst[size:size+util.Uint32SizeBytes], uint32(len(src)))
		return dst[:n+size+util.Uint32SizeBytes], nil
	}

	return nil, err
}

func LZ4DecompressBlock(src, dst []byte) ([]byte, error) {
	size := int(binary.BigEndian.Uint32(src))
	dst = slices.Grow(dst[:0], size)
	_, err := lz4.DecompressSafe(src[util.Uint32SizeBytes:], dst[:size])
	if err != nil {
		return nil, err
	}
	return dst[:size], nil
}

func SnappyCompressBlock(src, dst []byte) []byte {
	bound := snappy.MaxEncodedLen(len(src))

	size := len(dst)
	dst = slices.Grow(dst, bound)

	enc := snappy.Encode(dst[size:size+bound], src)
	return dst[:len(enc)+size]
}

func SnappyDecompressBlock(src, dst []byte) ([]byte, error) {
	return snappy.Decode(dst, src)
}

type SeriesMap struct {
	mu   sync.RWMutex
	data map[string]map[uint64]struct{}
}

func NewSeriesMap() *SeriesMap {
	return &SeriesMap{
		data: make(map[string]map[uint64]struct{}),
	}
}

func (s *SeriesMap) Add(mst string, sid uint64) {
	s.mu.Lock()
	defer s.mu.Unlock()

	series, ok := s.data[mst]
	if !ok {
		series = make(map[uint64]struct{})
		s.data[strings.Clone(mst)] = series
	}
	series[sid] = struct{}{}
}

func (s *SeriesMap) IsEmpty() bool {
	s.mu.RLock()
	ret := len(s.data) == 0
	s.mu.RUnlock()
	return ret
}

func (s *SeriesMap) HasMeasurement(mst string) bool {
	s.mu.RLock()
	_, ok := s.data[mst]
	s.mu.RUnlock()
	return ok
}

func (s *SeriesMap) Walk(fn func(mst string, series map[uint64]struct{})) {
	for mst, series := range s.data {
		fn(mst, series)
	}
}

type WalFileHot struct {
	memFile *fileops.MemFile
}

func (wf *WalFileHot) OpenMemFile() {
	// The actual amount of data to be written can be greater than the value of conf.MaxWalFileSize.
	// For example, when the remaining capacity is only 4 KB, the actual data to be written is 8 KB.
	// In this case, data can still be written.
	// Therefore, a 256 KB buffer is reserved here.
	wf.memFile = fileops.NewMemFile(int64(conf.MaxWalFileSize) + config.KB*256)
}

func (wf *WalFileHot) FreeMemory() {
	immutable.NewHotFileManager().IncrMemorySize(-wf.InMemSize())
	wf.memFile = nil
}

func (wf *WalFileHot) InMemSize() int64 {
	if mf := wf.memFile; mf != nil {
		return mf.MaxSize()
	}
	return 0
}

func readSeriesKey(dst []byte, header *WalBlockHeader, fd *WalFile) ([]byte, error) {
	dst = slices.Grow(dst[:0], int(header.size))[:header.size]
	_, err := fd.ReadFull(dst)
	if err != nil {
		return nil, err
	}

	switch header.compressFlag {
	case walCompressLz4:
		swap := bufferpool.Get()
		swap, err = LZ4DecompressBlock(dst, swap)
		bufferpool.Put(dst)
		return swap, err
	case walCompressSnappy:
		swap := bufferpool.Get()
		swap, err = SnappyDecompressBlock(dst, swap)
		bufferpool.Put(dst)
		return swap, err
	default:
		break
	}
	return dst, err
}
